写在前面 以下内容引用自 Minitorch Write Up ,本博客相关的工程实现在 https://github.com/Tokisakix/Terox/ 。
大二上的时候,我就在构思怎么样去写一个类似 Pytorch 的软件。当时大概把三大学习都学的差不多了,就想着自己去实现一些 AI 的组件,渐渐的便想着去了解一下 Pytorch 的代码及其背后的运行原理,只可惜因为时间安排和自身能力的原因而不能立刻着手去学习。
期末后剑圣跟我说到华为那边编写深度学习框架底层算子的事情,寒假时间多了,那确实可以把这件事情捡起来。摸索后我发现国外有两个 Lab 可以来帮助学习深度学习框架,一个是 CMU 的公开课程,一个是 Minitorch 。
CMU 的公开课程 是 CMU 在 2022 年 9 月开设的一门课程,讲师是 J.Zico Kolter 和陈天奇。这是一门注重实践的课程,课后作业主要是逐步实现一个类似于PyTorch的深度学习框架(类似 Pytorch 和 TensorFlow)。
Minitorch 是一个 Torch API 的纯 Python 重新实现,展示了从零开始构建一个张量和自动微分库。最终得到的库能够运行类 Torch 代码。项目创建者为康奈尔大学副教授(pre-tenure)Alexander Rush,这是一门硕士课程。
其中互联网上的评价为 CMU 的公开课程框架性好,易上手;Minitorch 更全面,同时也更难 。抉择之下,最后选择了 Minitorch 为我准备学习的课程。我一共花了四天来完成这个 Lab,平均每天用时 6h,收获颇多,便写此博客来记录学习 Minitorch 时的过程。
写完 Minitorch 了之后,我又看了一下跟 DLsys 相关的资料,还想要再多学习一下这背后的实现,于是新建了一个仓库,打算根据目前我的知识储备写一个自己的 Dlsys。
在床上思考了一个小时后,我偶然瞥见鬼泣5中尼禄 的英文名 Nero ,便想着要不起名为 Tero ,发现这个单词读起来并不舒服,便在后面加了一个 x,这样子末尾三个字母组合成了 rox,恰好是英雄联盟 s6 与 skt 在半决赛血战整个 Bo5 的传奇队伍,那是英雄联盟史上最为盛大的一次对决。想到这里,我赶忙翻身下床,创建了名为 Terox 的新仓库 。
Development Log 从 AutoDiff 开始 最开始我的打算是用 Python 实现自动微分,暂时先把功能跑起来,后续再用 Cython 重写。
考虑到后续 Scalar
和 Tensor
类都需要支持自动微分,我打算先实现一个能够做自动微分操作的基类 Variable
,这样子后续写 Scalar
和 Tensor
时就可以通过类的继承自动获得自动微分的能力。于是我对 Variable
类的定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Variable (): _id :int _item:object _history: Optional ["VarHistory" ] _gradient: Optional ["Variable" ] _require_grad: bool def __init__ ( self, _history:Optional ["VarHistory" ]=None , _gradient:Optional ["Variable" ]=None , _require_grad:bool =True ) -> None : self._history = _history self._gradient = _gradient self._require_grad = _require_grad pass def ...
其中 VarHistory
也是一个类,里面存储了生成当前 Variable
的运算双方和运算函数信息,对于模型的参数或临时生成的变量而言没有运算信息(此时用 None
代替),相关定义如下:
1 2 3 4 5 6 7 8 9 10 11 class VarHistory (): _func: VarFunction _args: Iterable["Variable" ] def __init__ (self, _func:VarFunction, _args:Iterable["Variable" ] ) -> None : self._func = _func self._args = _args return def ...
VarFunction
也是一个类,它是所有需要追踪梯度的计算的基类,相关定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class VarFunction (): def __init__ (self ) -> None : return def __call__ (self, *args: Any , **kwds: Any ) -> "Variable" : return self._forward(*args, **kwds) def _forward (self, args:Iterable["Variable" ] ) -> "Variable" : raise NotImplementedError def _backward (self, grad:"object" , args:Iterable["Variable" ] ) -> Iterable["Variable" ]: raise NotImplementedError def ...
定义好这三个类之后,就正式开始了自动微分的编写。首先实现的是梯度反向计算的函数 _chainRule()
,这个函数会对当前的 Variable
变量做判断,如果没有运算的历史信息便退出计算,如果有历史信息,便会根据历史运算函数 信息调用对应的反向传播计算出梯度,并将梯度累加给父母变量 。
当一个 Variable
变量调用 backward()
函数时,会先初始化该变量的梯度为 oneGrad()
,因为对于不同的继承类都有不同代表 1 的实例,所以这个函数在目前是未实现的,需要继承类自行去重载。初始化梯度后,_getTopoChain
和 getTopoList
会根据当前的前向传播信息计算出一条满足拓扑排序 的变量反向传播顺序。其实这里也可以采用深度优先搜索 的思路来做反向传播,但这样子时间还有空间复杂度都会很高,虽然拓扑排序的实现较繁琐,但在运行效率上来看是值得的。
相关反向传播的函数写好之后,也无法得到测试。因为我是先实现反向传播,此时我还没有开始写前向传播的重载,所以并不清楚目前的实现正不正确,事实证明目前的实现确实有问题(后文会提及)。
实现 Scalar Scalar
由继承 Variable
得来,同时对 new()
、_zeroGrad()
、_oneGrad()
、zero()
、one()
、detach()
函数做重载,因为已经对 Variable
实现了反向传播的操作,所以这部分只需要关心 Scalar
类前向传播运算的实现即可虽然但是在这里遇到了之前写 bp 时的 bug。
前向传播运算的实现和重载是很简单的,所有的运算都大差不差,下面以计算两个 Scalar
相乘为例;首先需要先定义一个 Mul
类,它继承自 VarFunction
,_forward()
函数中先计算出对应的数值 _item
,然后根据父母变量的梯度标识来确定运算结果需不需要带上历史信息,最后将构造好的运算结果返回就好了。
反向传播就根据梯度计算规则 直接计算即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class Mul (VarFunction ): def __init__ (self ) -> None : super ().__init__() return def _forward (self, a:Variable, b:Variable ) -> Variable: _item = mul(a.item(), b.item()) _require_grad = a.getRequireGrad() and b.getRequireGrad() _history = VarHistory(self, (a, b)) if _require_grad else None res = a.new(_item, _history, None , _require_grad) return res def _backward (self, grad:Variable, args: Iterable[Variable] ) -> Iterable[Variable]: (a, b) = args a_grad, b_grad = grad * grad.new(b.item()), grad * grad.new(a.item()) return a_grad, b_grad
接下来将写好的 Mul
类重载到 Scalar
的 __mul__()
魔法函数,这样子当我们编写类似 Scalar(1.0) * Scalar(2.0)
的代码是,这里的乘法操作会被重定向到自己编写的相乘操作。
1 2 3 4 5 6 7 8 class ScalarOptsBackend (): def __init__ (self ) -> None : self.Mul:VarFunction = Mul() return class Scalar (Variable ): def __mul__ (self, b:"Scalar" ) -> "Scalar" : return self._backend.Mul(self, b)
对所有常见的操作都参照 Mul
操作来做重载,常见的操作包括但不限于 add
、sub
、mul
、div
、neg
、max
、min
、eq
、lt
、gt
、abs
、exp
、log
、relu
…
我编写了一些 test,来检测我写的 Scalar
前向传播和反向传播是否有误。然后确实发现了很多问题,包括但不限于拓扑排序写错、更新梯度时递归爆栈…
Parameter 做深度学习时,我们需要时刻追踪模型里的参数,所以我们需要对一个模型里的所有参数做管理,于是我定义了 Module
与 Parameter
类,Parameter
可以对 Variable
变量做追踪,Module
类可以对其下所有的 Parameter
做追踪,我对这两个类的定义如下,具体实现在这里就略过了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 parameter_count:int = 0 class Parameter (): _id : int _value: Variable def __init__ (self, _value:Variable ) -> None : pass def __str__ (self ) -> str : pass def __repr__ (self ) -> str : pass def value (self ) -> Variable: pass module_count:int = 0 class Module (): _id : int def __init__ (self ) -> None : pass def __call__ (self, *args: Any , **kwds: Any ) -> Any : pass def getParmeterDict (self ) -> Dict [str , Parameter]: pass def parmeters (self ) -> List [Parameter]: pass def __str__ (self ) -> str : pass def __repr__ (self ) -> str : pass def forward (self, *args: Any , **kwds: Any ) -> Any : pass
module_test
已经很好的说明了这两个类的用法,在这里不想赘述 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 class M1 (Module ): def __init__ (self, p1:Parameter, p2:Parameter ) -> None : super ().__init__() self.p1 = p1 self.p2 = p2 return class M2 (Module ): def __init__ (self, m1:M1, p3:Parameter ) -> None : super ().__init__() self.m1 = m1 self.p3 = p3 return p1 = Parameter(Scalar(1.0 )) p2 = Parameter(Scalar(2.0 )) p3 = Parameter(Scalar(3.0 )) m1 = M1(p1, p2) m2 = M2(m1, p3) @pytest.mark.test_module def test_module_dict () -> None : assert m1.getParmeterDict() == {"p1" :p1, "p2" :p2} assert m2.getParmeterDict() == {"m1.p1" :p1, "m1.p2" :p2, "p3" :p3} return @pytest.mark.test_module def test_module_parmeter () -> None : assert m1.parmeters() == [p1, p2] assert m2.parmeters() == [p1, p2, p3] return @pytest.mark.test_module def test_module_repr () -> None : assert str (m1) == """ M1{ p1:<Parameter<Scalar(1.0), grad_fn=None, grad=True>> p2:<Parameter<Scalar(2.0), grad_fn=None, grad=True>> } """ assert str (m2) == """ M2{ m1:M1{ p1:<Parameter<Scalar(1.0), grad_fn=None, grad=True>> p2:<Parameter<Scalar(2.0), grad_fn=None, grad=True>> } p3:<Parameter<Scalar(3.0), grad_fn=None, grad=True>> } """ return
Scalar Project 实现了 Scalar
、Parameter
和 Module
后,可以尝试写一个基于 Terox 的深度学习 pieline 了。
首先是数据生成的部分,考虑比较小的数据集,对 x-y 平面坐标里的点做二分类,若 x * x > y 则 label 为 0,反之为 1。
1 2 3 4 5 6 7 8 9 10 11 def getDataSet (num:int ) -> List : DATA = [] for _ in range (num): x = random() - 0.5 y = random() - 0.5 if x * x > y: label = 0 else : label = 1 DATA.append(([x, y], label)) return DATA
接着是模型层,因为目前的实现是针对一个标量(Scalar)做追踪,所以针对矩阵乘法的实现可能比较笨拙,下面是线性回归层的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class ScalarLinear (Module ): def __init__ (self, in_feature:int , out_feature:int , bias:bool =True ) -> None : super ().__init__() self.in_feature = in_feature self.out_feature = out_feature self.bias = bias for i in range (in_feature): for j in range (out_feature): self.__dict__[f"w{i} {j} " ] = Parameter(Scalar(2 * (random() - 0.5 ))) if self.bias: for j in range (out_feature): self.__dict__[f"b{j} " ] = Parameter(Scalar(2 * (random() - 0.5 ))) return def forward (self, inputs:List [Scalar] ) -> List [Scalar]: out = [Scalar(0.0 ) for _ in range (self.out_feature)] for i in range (self.in_feature): for j in range (self.out_feature): out[j] += inputs[i] * self.__dict__[f"w{i} {j} " ].value() if self.bias: for j in range (self.out_feature): out[j] += self.__dict__[f"b{j} " ].value() return out
最后依次实现 classifymodel
、SGD
、MSELoss
,我们便写好了一个简易的深度学习流水线,将对 1000 个二维数据做二分类,使用高达两层 线性回归层的感知机,可训练参数高达 96 个浮点数,准确率跟上个hi达到了 98.20% 之高!bushi
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 N = 1000 EPOCHS = 100 LR = 5e-1 def test (model:Module, dataset:List ) -> float : Acc = 0 for inputs, labels in tqdm(dataset): inputs = [Scalar(num) for num in inputs] outpus = model(inputs) Acc += 1 if int (outpus[0 ].item() + 0.5 ) == labels else 0 acc = Acc / len (dataset) return acc if __name__ == "__main__" : dataset = getDataSet(N) model = ScalarIrisClassifyModel(in_feature=2 , hidden_feature=16 , out_feature=1 ) criterion = MSELoss optimizer = SGD(model.parmeters(), LR) print (f"[INFO] Start Acc:{test(model, dataset) * 100 :.2 f} %" ) for epoch in range (EPOCHS): Loss = 0.0 for inputs, labels in tqdm(dataset): inputs = [Scalar(num) for num in inputs] labels = [Scalar(labels)] outpus = model(inputs) optimizer.zero_grad() loss = criterion(outpus, labels) loss.backward() Loss += loss.item() optimizer.step() Loss /= len (dataset) Acc = test(model, dataset) print (f"[INFO] Epoch:{epoch} Loss:{Loss:.6 f} Acc:{Acc * 100 :.2 f} %" ) print ("Finished training!" )
结语 Terox 目前实现了自动微分和模型参数管理的功能,已经可以做简易的模型训练,最近估计会闲置这个项目一段时间。目前因为所有的计算都是标量计算,会产生大量的中间变量,等到一两周后再回过头来去实现 Tensor 的部分,能大大加速计算的效率。
回头还有两个人工智能实验课的 Lab 没做……再不做就要压 DDL 了,悲伤
这个系列下次再见,正如本篇名为 Terox DevLog-0 一样,下一篇应该叫 Terox DevLog-1 。